Skip to main content

安装

安装包下载

链接:https://pan.baidu.com/s/1nBrOPbvxn-tVq1DBwEkkaw 
提取码:7b1i
--来自百度网盘超级会员V5的分享

修改配置文件

my_env.sh

sudo vim /etc/profile.d/my_env.sh

#HADOOP_HOME
export HADOOP_HOME=/home/bigdata/module/hadoop-3.2.3
export PATH=$PATH:$HADOOP_HOME/bin
export PATH=$PATH:$HADOOP_HOME/sbin
export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop
export HADOOP_CLASSPATH=`hadoop classpath`

source /etc/profile.d/my_env.sh

加入jar到flink的lib目录

paimon-flink-1.16-0.6-20231101.045607-56.jar
paimon-flink-action-0.6-20231101.045607-55.jar
vim /home/bigdata/module/flink-1.16.0/conf/flink-conf.yaml
classloader.check-leaked-classloader: false
taskmanager.numberOfTaskSlots: 4

execution.checkpointing.interval: 10s
state.backend: rocksdb
state.checkpoints.dir: hdfs://bigdatacluster/ckps
state.backend.incremental: true
#解决中文乱码,1.17之前参数是env.java.opts,1.17之后是env.java.opts.all
env.java.opts: -Dfile.encoding=UTF-8

解决依赖问题

cp /home/bigdata/module/hadoop-3.2.3/share/hadoop/mapreduce/hadoop-mapreduce-client-core-3.2.3.jar /home/bigdata/module/flink-1.16.0/lib/

初体验

用yarn-session启动一个集群

/home/bigdata/module/flink-1.16.0/bin/yarn-session.sh -nm test -d

启动Flink的sql-client

/home/bigdata/module/flink-1.16.0/bin/sql-client.sh -s yarn-session

设置显示模式

SET 'sql-client.execution.result-mode' = 'tableau';

创建catalog

  • 文件系统(默认):将元数据和表文件存储在文件系统中。
  • hive:在 hive metastore中存储元数据。用户可以直接从 Hive 访问表。

文件系统

hdfs dfs -mkdir -p /paimon/fs
CREATE CATALOG fs_catalog WITH (
'type' = 'paimon',
'warehouse' = 'hdfs://bigdatacluster/paimon/fs'
);

USE CATALOG fs_catalog;

catalog初始化文件

vim conf/sql-client-init.sql

CREATE CATALOG fs_catalog WITH (
'type' = 'paimon',
'warehouse' = 'hdfs://bigdatacluster/paimon/fs'
);
USE CATALOG fs_catalog;

命令指定执行sql-client的时候先执行初始化文件,就可以直接读取之前创建的catalog了。

bin/sql-client.sh -s yarn-session -i conf/sql-client-init.sql

查看catalog

show catalogs;
show current catalog;

Hive Catalog (生产配置,学习阶段可以跳过)

通过使用Hive Catalog,对Catalog的更改将直接影响相应的hive metastore。在此类Catalog中创建的表也可以直接从 Hive 访问。要使用 Hive Catalog,数据库名称、表名称和字段名称应小写。

  1. 上传 hive-connector。将flink-sql-connector-hive-3.1.3_2.12-1.17.0.jar上传到Flink的lib目录下。

  2. 重启yarn-session集群。

  3. 启动hive的metastore服务。

nohup hive --service metastore &
  1. 创建Hive Catalog。
CREATE CATALOG hive_catalog WITH (
'type' = 'paimon',
'metastore' = 'hive',
'uri' = 'thrift://hadoop102:9083',
'hive-conf-dir' = '/opt/module/hive/conf',
'warehouse' = 'hdfs://hadoop102:8020/paimon/hive'
);

USE CATALOG hive_catalog;
  1. 注意事项。 使用hive Catalog通过alter table更改不兼容的列类型时,参见 HIVE-17832。需要配置。
vim /opt/module/hive/conf/hive-site.xml;

<property>
<name>hive.metastore.disallow.incompatible.col.type.changes</name>
<value>false</value>
</property>

上述配置需要在hive-site.xml中配置,且hive metastore服务需要重启。 如果使用的是 Hive3,请禁用 Hive ACID:

hive.strict.managed.tables=false
hive.create.as.insert.only=false
metastore.create.as.acid=false